package org.ricks.net;

import org.ricks.common.ObjectPool;
import org.ricks.common.PooledObject;
import org.ricks.common.ReItrLinkedList;
import org.ricks.common.ReusableListIterator;
import org.ricks.log.Logger;

import java.nio.ByteBuffer;
import java.util.*;

/**
 * Java implementation of <a href="https://github.com/skywind3000/kcp">KCP</a>
 *
 * @author <a href="mailto:szhnet@gmail.com">szh</a>
 */
public abstract class Kcp {


    /**
     * no delay min rto
     */
    public static final int IKCP_RTO_NDL = 30;

    /**
     * normal min rto
     */
    public static final int IKCP_RTO_MIN = 100;

    public static final int IKCP_RTO_DEF = 200;

    public static final int IKCP_RTO_MAX = 60000;

    /**
     * cmd: push data
     */
    public static final byte IKCP_CMD_PUSH = 81;

    /**
     * cmd: ack
     */
    public static final byte IKCP_CMD_ACK = 82;

    /**
     * cmd: window probe (ask)
     * 询问对方当前剩余窗口大小 请求
     */
    public static final byte IKCP_CMD_WASK = 83;

    /**
     * cmd: window size (tell)
     * 返回本地当前剩余窗口大小
     */
    public static final byte IKCP_CMD_WINS = 84;

    /**
     * need to send IKCP_CMD_WASK
     */
    public static final int IKCP_ASK_SEND = 1;

    /**
     * need to send IKCP_CMD_WINS
     */
    public static final int IKCP_ASK_TELL = 2;

    public static final int IKCP_WND_SND = 1024;

    public static final int IKCP_WND_RCV = 1024;

    public static final int IKCP_MTU_DEF = 1400;

    public static final int IKCP_INTERVAL = 100;

    public static int IKCP_OVERHEAD = 24;

    public static final int IKCP_DEADLINK = 20;

    public static final int IKCP_THRESH_INIT = 2;

    public static final int IKCP_THRESH_MIN = 2;

    /**
     * 7 secs to probe window size
     */
    public static final int IKCP_PROBE_INIT = 7000;

    /**
     * up to 120 secs to probe window
     */
    public static final int IKCP_PROBE_LIMIT = 120000;

    public static final int IKCP_SN_OFFSET   = 12;

    private long current = 0;

    private int ackMaskSize = 0;
    /**会话id**/
    private int conv;
    /**最大传输单元**/
    private int mtu = IKCP_MTU_DEF;
    /**最大分节大小  mtu减去头等部分**/
    private int mss = this.mtu - IKCP_OVERHEAD;
    /**状态**/
    private int state;
    /**已发送但未确认**/
    private long sndUna;
    /**下次发送下标**/
    private long sndNxt;
    /**下次接收下标**/
    private long rcvNxt;
    /**上次ack时间**/
    private long tsLastack;
    /**慢启动门限**/
    private int ssthresh = IKCP_THRESH_INIT;
    /**RTT(Round Trip Time)**/
    private int rxRttval;
    /**SRTT平滑RTT*/
    private int rxSrtt;
    /**RTO重传超时*/
    private int rxRto = IKCP_RTO_DEF;
    /**MinRTO最小重传超时*/
    private int rxMinrto = IKCP_RTO_MIN;
    /**发送窗口**/
    private int sndWnd = IKCP_WND_SND;
    /**接收窗口**/
    private int rcvWnd = IKCP_WND_RCV;
    /**当前对端可接收窗口**/
    private int rmtWnd = IKCP_WND_RCV;

    /**拥塞控制窗口**/
    private int cwnd;
    /**探测标志位**/
    private int probe;
    ///**当前时间**/
    //private long current;
    /**间隔**/
    private int interval = IKCP_INTERVAL;
    /**发送**/
    private long tsFlush = IKCP_INTERVAL;
    /**是否无延迟 0不启用；1启用**/
    private boolean nodelay;
    /**状态是否已更新**/
    private boolean updated;
    /**探测时间**/
    private long tsProbe;
    /**探测等待**/
    private int probeWait;
    /**死连接 重传达到该值时认为连接是断开的**/
    private int deadLink = IKCP_DEADLINK;
    /**拥塞控制增量**/
    private long incr;
    /**收到包立即回ack**/
    private boolean ackNoDelay;

    /**待发送窗口窗口**/
    private LinkedList<Segment> sndQueue = new LinkedList<>();
    /**发送后待确认的队列**/
    private ReItrLinkedList<Segment> sndBuf = new ReItrLinkedList<>();

    /**收到后有序的队列**/
    private ReItrLinkedList<Segment> rcvQueue = new ReItrLinkedList<>();
    /**收到的消息 无序的**/
    private ReItrLinkedList<Segment> rcvBuf = new ReItrLinkedList<>();

    private ReusableListIterator<Segment> rcvQueueItr = rcvQueue.listIterator();

    public ReusableListIterator<Segment> sndBufItr = sndBuf.listIterator();

    private ReusableListIterator<Segment> rcvBufItr = rcvBuf.listIterator();

    private List acklist = new ArrayList();

    private int ackcount;

    private Object user;
    /**是否快速重传 默认0关闭，可以设置2（2次ACK跨越将会直接重传）**/
    private int fastresend;
    /**是否关闭拥塞控制窗口**/
    private boolean nocwnd;
    /**是否流传输**/
    private boolean stream;

    /**头部预留长度  为fec checksum准备**/
    private int reserved;

    private IoBuffer buffer = new IoBuffer(IKCP_MTU_DEF);

    protected abstract void out(ByteBuffer data);

    private static long long2Uint(long n) {
        return n & 0x00000000FFFFFFFFL;
    }

    private static int ibound(int lower, int middle, int upper) {
        return Math.min(Math.max(lower, middle), upper);
    }

    private static int itimediff(long later, long earlier) {
        return (int) (later - earlier);
    }


    public void release() {
        release(sndBuf);
        release(rcvBuf);
        release(sndQueue);
        release(rcvQueue);
    }

    private void release(List<Segment> segQueue) {
        segQueue.forEach(segment -> segment.closeSegment());
    }

    /**
     * 1，判断是否有完整的包，如果有就抛给下一层
     * 2，整理消息接收队列，判断下一个包是否已经收到 收到放入rcvQueue
     * 3，判断接收窗口剩余是否改变，如果改变记录需要通知
     * @param buffer
     * @return
     */
    public int recv(ByteBuffer buffer) {
        if (rcvQueue.isEmpty()) {
            return -1;
        }
        int peekSize = peekSize();

        if (peekSize < 0) {
            return -2;
        }
        //接收队列长度大于接收窗口？比如接收窗口是32个包，目前已经满32个包了，需要在恢复的时候告诉对方
        boolean recover = false;
        if (rcvQueue.size() >= rcvWnd) {
            recover = true;
        }

        // merge fragment
        int len = 0;
        for (Iterator<Segment> itr = rcvQueueItr.rewind(); itr.hasNext(); ) {
            Segment seg = itr.next();
            len += seg.data.readableBytes();
            buffer.put(seg.data.array());

            int fragment = seg.frg;

            // log
            Logger.debug("{} recv sn={}", this, seg.sn);

            itr.remove();
            seg.closeSegment();

            if (fragment == 0) {
                break;
            }
        }

        assert len == peekSize;

        // move available data from rcv_buf -> rcv_queue
        moveRcvData();

        // fast recover接收队列长度小于接收窗口，说明还可以接数据，已经恢复了，在下次发包的时候告诉对方本方的窗口
        if (rcvQueue.size() < rcvWnd && recover) {
            // ready to send back IKCP_CMD_WINS in ikcp_flush
            // tell remote my window size
            probe |= IKCP_ASK_TELL;
        }

        return len;
    }

    /**
     * check the size of next message in the recv queue
     * 检查接收队列里面是否有完整的一个包，如果有返回该包的字节长度
     * @return -1 没有完整包， >0 一个完整包所含字节
     */
    public int peekSize() {
        if (rcvQueue.isEmpty()) {
            return -1;
        }

        Segment seg = rcvQueue.peek();
        //第一个包是一条应用层消息的最后一个分包？一条消息只有一个包的情况？
        if (seg.frg == 0) {
            return seg.data.readableBytes();
        }
        //接收队列长度小于应用层消息分包数量？接收队列空间不够用于接收完整的一个消息？
        if (rcvQueue.size() < seg.frg + 1) { // Some segments have not arrived yet
            return -1;
        }

        int len = 0;
        for (Iterator<Segment> itr = rcvQueueItr.rewind(); itr.hasNext(); ) {
            Segment s = itr.next();
            len += s.data.readableBytes();
            if (s.frg == 0) {
                break;
            }
        }

        return len;
    }

    /**
     * 判断一条消息是否完整收全了
     * @return
     */
    public boolean canRecv() {
        if (rcvQueue.isEmpty()) {
            return false;
        }

        Segment seg = rcvQueue.peek();
        if (seg.frg == 0) {
            return true;
        }

        if (rcvQueue.size() < seg.frg + 1) { // Some segments have not arrived yet
            return false;
        }

        return true;
    }


    public int send(byte[] data) {
        assert mss > 0;

        int len = data.length;
        if (len == 0) {
            return -1;
        }

        // append to previous segment in streaming mode (if possible)
        int index = 0;
        if (stream && !sndQueue.isEmpty()) {
            Segment seg = sndQueue.peekLast();
            index = Math.min(len, seg.data.writableBytes());
            seg.data.put(data, 0, index);
        }

        int left = len - index;
        int count = left <= mss ? 1 : (left + mss - 1) / mss;

        if (count > 255) { // Maybe don't need the conditon in stream mode
            return -2;
        }

        // segment
        for (int i = 0; i < count; i++) {
            int size = left > mss ? mss : left;
            Segment seg = Segment.createSegment();
            seg.conv=conv;
            seg.frg =  (byte) (stream ? 0 : count - i - 1);
            seg.data = new IoBuffer(size);
            seg.data.put(data,index,size);
            index += size;
            left -= size;
            sndQueue.add(seg);
        }
        return 0;
    }

    /**
     * update ack.
     * parse ack根据RTT计算SRTT和RTO即重传超时
     * @param rtt
     */
    private void updateAck(int rtt) {
        if (rxSrtt == 0) {
            rxSrtt = rtt;
            rxRttval = rtt >> 2;
        } else {
            int delta = rtt - rxSrtt;
            rxSrtt += delta>>3;
            delta = Math.abs(delta);
            if (rtt < rxSrtt - rxRttval) {
                rxRttval += ( delta - rxRttval)>>5;
            } else {
                rxRttval += (delta - rxRttval) >>2;
            }
        }
        int rto = rxSrtt + Math.max(interval, rxRttval<<2);
        rxRto = ibound(rxMinrto, rto, IKCP_RTO_MAX);
    }

    private void shrinkBuf() {
        if (sndBuf.size() > 0) {
            Segment seg = sndBuf.peek();
            sndUna = seg.sn;
        } else {
            sndUna = sndNxt;
        }
    }

    private void parseAck(long sn) {
        if (itimediff(sn, sndUna) < 0 || itimediff(sn, sndNxt) >= 0) {
            return;
        }

        for (Iterator<Segment> itr = sndBufItr.rewind(); itr.hasNext(); ) {
            Segment seg = itr.next();
            if (sn == seg.sn) {
                itr.remove();
                seg.closeSegment();
                break;
            }
            if (itimediff(sn, seg.sn) < 0) {
                break;
            }
        }
    }

    private int parseUna(long una) {
        int count = 0;
        for (Iterator<Segment> itr = sndBufItr.rewind(); itr.hasNext(); ) {
            Segment seg = itr.next();
            if (itimediff(una, seg.sn) > 0) {
                count++;
                itr.remove();
                seg.closeSegment();
            } else {
                break;
            }
        }
        return count;
    }

    private void parseAckMask(long una,long ackMask){
        if(ackMask==0)
        {
            return;
        }
        for (Iterator<Segment> itr = sndBufItr.rewind(); itr.hasNext(); ) {
            Segment seg = itr.next();
            long index = seg.sn-una-1;
            if(index<0){
                continue;
            }
            if(index>=ackMaskSize) {
                break;
            }
            long mask = ackMask&1<<index;
            if(mask!=0){
                itr.remove();
                seg.closeSegment();
            }
        }
    }


    private void parseFastack(long sn,long ts) {
        if (itimediff(sn, sndUna) < 0 || itimediff(sn, sndNxt) >= 0) {
            return;
        }

        for (Iterator<Segment> itr = sndBufItr.rewind(); itr.hasNext(); ) {
            Segment seg = itr.next();
            if (itimediff(sn, seg.sn) < 0) {
                break;
                //根据时间判断  在当前包时间之前的包才能被认定是需要快速重传的
            } else if (sn != seg.sn&& itimediff(seg.ts, ts) <= 0) {
                seg.fastack++;
            }
        }
    }

    private void ackPush(long sn, long ts) {
        acklist.add(sn);
        acklist.add(ts);
    }

    private boolean parseData(Segment newSeg) {
        long sn = newSeg.sn;

        if (itimediff(sn, rcvNxt + rcvWnd) >= 0 || itimediff(sn, rcvNxt) < 0) {
            newSeg.closeSegment();
            return true;
        }

        boolean repeat = false;
        boolean findPos = false;
        ListIterator<Segment> listItr = null;
        if (rcvBuf.size() > 0) {
            listItr = rcvBufItr.rewind(rcvBuf.size());
            while (listItr.hasPrevious()) {
                Segment seg = listItr.previous();
                if (seg.sn == sn) {
                    repeat = true;
                    //Snmp.snmp.RepeatSegs.incrementAndGet();
                    break;
                }
                if (itimediff(sn, seg.sn) > 0) {
                    findPos = true;
                    break;
                }
            }
        }

        if (repeat) {
            newSeg.closeSegment();
        } else if (listItr == null) {
            rcvBuf.add(newSeg);
        } else {
            if (findPos) {
                listItr.next();
            }
            listItr.add(newSeg);
        }

        // move available data from rcv_buf -> rcv_queue
        moveRcvData(); // Invoke the method only if the segment is not repeat?
        return repeat;
    }

    private void moveRcvData() {
        for (Iterator<Segment> itr = rcvBufItr.rewind(); itr.hasNext(); ) {
            Segment seg = itr.next();
            if (seg.sn == rcvNxt && rcvQueue.size() < rcvWnd) {
                itr.remove();
                rcvQueue.add(seg);
                rcvNxt++;
            } else {
                break;
            }
        }
    }

    // when you received a low level packet (eg. UDP packet), call it
    //---------------------------------------------------------------------
    // input data
    //---------------------------------------------------------------------
    // 底层收包后调用，再由上层通过Recv获得处理后的数据
    public int input(ByteBuffer data) {
        long s_una = sndUna;
        if (data.remaining() < IKCP_OVERHEAD) {
            return -1;
        }
        Logger.debug("{} [RI] {} bytes", this, data.remaining());
        while (true) {
            long ts, sn, una;
            int wnd,len,conv_;
            byte cmd, frg;

            if (data.remaining() < IKCP_OVERHEAD) {
                break;
            }

            conv_ = data.getInt();
            if (conv != conv_) {
                Logger.error("this.conv: {} != {} ",conv,conv_);
                return -4;
            }

            cmd = data.get();
            frg = data.get();
            wnd = data.getShort();
            ts = data.getInt();
            sn = data.getInt();
            una = data.getInt();
            len = data.getInt();

            if (data.remaining() < len) {
                Logger.error("客户端告诉我包里有 {} 数据。但实际上 只有 {} ",len,data.remaining());
                return -2;
            }

            if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK && cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) {
                return -3;
            }

            rmtWnd =  wnd;
            parseUna(una);
            shrinkBuf();

            if (IKCP_CMD_ACK == cmd) {
                if (itimediff(current, ts) >= 0) {
                    updateAck(itimediff(current, ts));
                }
                parseAck(sn);
                shrinkBuf();
            } else if (IKCP_CMD_PUSH == cmd) {
                if (itimediff(sn, rcvNxt + rcvWnd) < 0) {
                    ackPush(sn, ts);
                    if (itimediff(sn, rcvNxt) >= 0) {
                        //val seg = Segment(conv, cmd, frg, wnd, ts, sn, una, IoBuffer(len))
                        Segment seg = Segment.createSegment();
                        seg.conv = conv_;
                        seg.cmd = cmd;
                        seg.frg = frg;
                        seg.wnd = (short) wnd;
                        seg.ts = ts;
                        seg.sn = sn;
                        seg.una = una;
                        seg.data = new IoBuffer(len);
                        if (len > 0) {
                            seg.data.put(data, len);
                        }
                        parseData(seg);
                    } else {
                        data.position(data.position() + len);
                    }
                } else {
                    data.position(data.position() + len);
                }
            } else if (IKCP_CMD_WASK == cmd) {
                // ready to send back IKCP_CMD_WINS in Ikcp_flush
                // tell remote my window size
                probe |= IKCP_ASK_TELL;
            } else if (IKCP_CMD_WINS == cmd) {
                // do nothing
            } else {
                return -3;
            }
        }

        if (itimediff(sndUna, s_una) > 0) {
            if (cwnd < rmtWnd) {
                long mss_ = mss;
                if (cwnd < ssthresh) {
                    cwnd++;
                    incr += mss_;
                } else {
                    if (incr < mss_) {
                        incr = mss_;
                    }
                    incr += (mss_ * mss_) / incr + (mss_ / 16);
                    if ((cwnd + 1) * mss_ <= incr) {
                        cwnd++;
                    }
                }
                if (cwnd > rcvWnd) {
                    cwnd = rcvWnd;
                    incr = rcvWnd * mss_;
                }
            }
        }
        return 0;
    }

    private int wndUnused() {
        if (rcvQueue.size() < rcvWnd) {
            return rcvWnd - rcvQueue.size();
        }
        return 0;
    }

    private  long startTicks = System.currentTimeMillis();

    public long currentMs(long now) {
        return now-startTicks;
    }


    /**
     * ikcp_flush
     */
    public long flush(long current) {

        current = currentMs(current);

        Segment seg = Segment.createSegment();
        seg.conv = conv;
        seg.cmd = IKCP_CMD_ACK;
        seg.wnd = (short) wndUnused();//可接收数量
        seg.una = rcvNxt;//已接收数量，下次要接收的包的sn，这sn之前的包都已经收到

        //计算ackMask
        int count = acklist.size() / 2;

        for (int i = 0; i < count; i++) {
            if (buffer.readableBytes() + IKCP_OVERHEAD > mtu) {
                out(buffer.toByteBuffer());
                buffer.clear(); // = IoBuffer((mtu + IKCP_OVERHEAD) )
            }
            //seg = Segment(conv = conv, cmd = IKCP_CMD_ACK, wnd = wnd_unused.toShort, una = rcv_nxt, sn = acklist(i * 2 + 0).toLong, ts = acklist(i * 2 + 1).toLong)
            seg.sn=  Long.parseLong(acklist.get(i * 2 + 0).toString());
            seg.ts = Long.parseLong(acklist.get(i * 2 + 1).toString());
            buffer.put(seg.encode());
        }

        acklist.clear();

        // probe window size (if remote window size equals zero)
        //拥堵控制 如果对方可接受窗口大小为0  需要询问对方窗口大小
        if (rmtWnd == 0) {
            if (probeWait == 0) {
                probeWait = IKCP_PROBE_INIT;
                tsProbe = current + probeWait;
            } else {
                if (itimediff(current, tsProbe) >= 0) {
                    if (probeWait < IKCP_PROBE_INIT) {
                        probeWait = IKCP_PROBE_INIT;
                    }
                    probeWait += probeWait / 2;
                    if (probeWait > IKCP_PROBE_LIMIT) {
                        probeWait = IKCP_PROBE_LIMIT;
                    }
                    tsProbe = current + probeWait;
                    probe |= IKCP_ASK_SEND;
                }
            }
        } else {
            tsProbe = 0;
            probeWait = 0;
        }

        // flush window probing commands
        if ((probe & IKCP_ASK_SEND) != 0) {
            seg.cmd = IKCP_CMD_WASK;
            //val seg = Segment(conv = conv, cmd = IKCP_CMD_WASK.toByte, wnd = segment.wnd, sn = segment.sn)
            if (buffer.readableBytes() + IKCP_OVERHEAD > mtu) {
                out(buffer.toByteBuffer());
                buffer.clear();// = IoBuffer((mtu + IKCP_OVERHEAD) )
            }
            buffer.put(seg.encode());
        }

        // flush window probing commands
        if ((probe & IKCP_ASK_TELL) != 0) {
            seg.cmd = IKCP_CMD_WINS;
            if (buffer.readableBytes() + IKCP_OVERHEAD > mtu) {
                out(buffer.toByteBuffer());
                buffer.clear();// = IoBuffer((mtu + IKCP_OVERHEAD) )
            }
            buffer.put(seg.encode());
        }

        probe = 0;

        // calculate window size
        int cwnd0 = Math.min(sndWnd, rmtWnd);
        if (!nocwnd) {
            cwnd0 = Math.min(this.cwnd, cwnd0);
        }

        int newSegsCount=0;
        // move data from snd_queue to snd_buf
        while (itimediff(sndNxt, sndUna + cwnd0) < 0) {
            Segment newSeg = sndQueue.poll();
            if (newSeg == null) {
                break;
            }
            newSeg.conv = conv;
            newSeg.cmd = IKCP_CMD_PUSH;
            newSeg.sn = sndNxt;
            sndBuf.add(newSeg);
            sndNxt++;
            newSegsCount++;
        }

        // calculate resent
        int resent = fastresend > 0 ? fastresend : Integer.MAX_VALUE;

        // flush data segments
        int change = 0;
        boolean lost = false;
        int lostSegs = 0, fastRetransSegs=0, earlyRetransSegs=0;
        long minrto = interval;


        for (Iterator<Segment> itr = sndBufItr.rewind(); itr.hasNext();) {
            Segment segment = itr.next();
            boolean needsend = false;
            if (segment.xmit == 0) {
                needsend = true;
                segment.rto = rxRto;
                segment.resendts = current + segment.rto;
                Logger.debug("{} flush data: sn={}, resendts={}", this, segment.sn, (segment.resendts - current));
            }  else if (segment.fastack >= resent) {
                needsend = true;
                segment.fastack = 0;
                segment.rto = rxRto;
                segment.resendts = current + segment.rto;
                change++;
                fastRetransSegs++;
                Logger.debug("{} fastresend. sn={}, xmit={}, resendts={} ", this, segment.sn, segment.xmit, (segment
                            .resendts - current));
            }
            else if(segment.fastack>0 &&newSegsCount==0){  // early retransmit
                needsend = true;
                segment.fastack = 0;
                segment.rto = rxRto;
                segment.resendts = current + segment.rto;
                change++;
                earlyRetransSegs++;
            }
            else if (itimediff(current, segment.resendts) >= 0) {
                needsend = true;
                if (!nodelay) {
                    segment.rto += rxRto;
                } else {
                    segment.rto += rxRto / 2;
                }
                segment.fastack = 0;
                segment.resendts = current + segment.rto;
                lost = true;
                lostSegs++;
                Logger.debug("{} resend. sn={}, xmit={}, resendts={}", this, segment.sn, segment.xmit, (segment
                            .resendts - current));
            }


            if (needsend) {
                segment.xmit++;
                segment.ts = long2Uint(current);
                segment.wnd = seg.wnd;
                segment.una = rcvNxt;

                int need = IKCP_OVERHEAD + segment.data.readableBytes();
                if (buffer.readableBytes() + need > mtu) {
                    out(buffer.toByteBuffer());
                    buffer.clear();// = IoBuffer((mtu + IKCP_OVERHEAD) )
                }
                if(conv!=segment.conv)  System.err.println("conv!=segment.conv");
                buffer.put(segment.encode());
                if (segment.data.readableBytes() > 0) buffer.put(segment.data.array());
                if (segment.xmit >= IKCP_DEADLINK) state = -1;
            }
        }

        // flash remain segments
        // flash remain segments
        if (buffer.readableBytes() > 0) {
            out(buffer.toByteBuffer());
            buffer.clear(); // = IoBuffer((mtu + IKCP_OVERHEAD) )
        }

        //不关闭快速重传
        if (change > 0) {
            int inflight = (int) (sndNxt - sndUna);
            ssthresh = inflight / 2;
            if (ssthresh < IKCP_THRESH_MIN) ssthresh = IKCP_THRESH_MIN;

            cwnd = ssthresh + resent;
            incr = cwnd * mss;
        }

        if (lost) {
            ssthresh = cwnd0 / 2;
            if (ssthresh < IKCP_THRESH_MIN) ssthresh = IKCP_THRESH_MIN;
            cwnd = 1;
            incr = mss;
        }

        if (cwnd < 1) {
            cwnd = 1;
            incr = mss;
        }
        seg.closeSegment();
        return minrto;
    }

    /**
     * update getState (call it repeatedly, every 10ms-100ms), or you can ask
     * ikcp_check when to call it again (without ikcp_input/_send calling).
     * 'current' - current timestamp in millisec.
     *
     * @param current
     */
    public void update(long current) {

        if (!updated) {
            updated = true;
            tsFlush = current;
        }

        int slap = itimediff(current, tsFlush);

        if (slap >= 10000 || slap < -10000) {
            tsFlush = current;
            slap = 0;
        }
        if (slap >= 0) {
            tsFlush += interval;
            if (itimediff(current, tsFlush) >= 0) {
                tsFlush = current + interval;
            }
        } else {
            tsFlush = current + interval;
        }
        flush(current);
    }

    /**
     * Determine when should you invoke ikcp_update:
     * returns when you should invoke ikcp_update in millisec, if there
     * is no ikcp_input/_send calling. you can call ikcp_update in that
     * time, instead of call update repeatly.
     * Important to reduce unnacessary ikcp_update invoking. use it to
     * schedule ikcp_update (eg. implementing an epoll-like mechanism,
     * or optimize ikcp_update when handling massive kcp connections)
     *
     * @param current
     * @return
     */
    public long check(long current) {
        if (!updated) {
            return current;
        }

        long tsFlush = this.tsFlush;
        int slap = itimediff(current, tsFlush);
        if (slap >= 10000 || slap < -10000) {
            tsFlush = current;
            slap = 0;
        }

        if (slap >= 0) {
            return current;
        }

        int tmFlush = itimediff(tsFlush, current);
        int tmPacket = Integer.MAX_VALUE;

        for (Iterator<Segment> itr = sndBufItr.rewind(); itr.hasNext(); ) {
            Segment seg = itr.next();
            int diff = itimediff(seg.resendts, current);
            if (diff <= 0) {
                return current;
            }
            if (diff < tmPacket) {
                tmPacket = diff;
            }
        }

        int minimal = tmPacket < tmFlush ? tmPacket : tmFlush;
        if (minimal >= interval) {
            minimal = interval;
        }

        return current + minimal;
    }

    public boolean checkFlush() {
        if (!acklist.isEmpty()) {
            return true;
        }
        if (probe != 0) {
            return true;
        }
        if (sndBuf.size() > 0) {
            return true;
        }
        if (sndQueue.size() > 0) {
            return true;
        }
        return false;
    }


    public int setMtu(int mtu) {
        if (mtu < IKCP_OVERHEAD || mtu < 50) {
            return -1;
        }
        this.buffer = new IoBuffer((mtu + IKCP_OVERHEAD) );
        this.mtu = mtu;
        this.mss = mtu - IKCP_OVERHEAD;
        return 0;
    }

    public int getInterval() {
        return interval;
    }

    public int nodelay(boolean nodelay, int interval, int resend, boolean nc) {
        this.nodelay = nodelay;
        if (nodelay) {
            this.rxMinrto = IKCP_RTO_NDL;
        } else {
            this.rxMinrto = IKCP_RTO_MIN;
        }

        if (interval >= 0) {
            if (interval > 5000) {
                interval = 5000;
            } else if (interval < 10) {
                interval = 10;
            }
            this.interval = interval;
        }

        if (resend >= 0) {
            fastresend = resend;
        }

        this.nocwnd = nc;

        return 0;
    }

    public int waitSnd() {
        return this.sndBuf.size() + this.sndQueue.size();
    }

    public int getConv() {
        return conv;
    }

    public void setConv(int conv) {
        this.conv = conv;
    }

    public Object getUser() {
        return user;
    }

    public void setUser(Object user) {
        this.user = user;
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }

    public boolean isNodelay() {
        return nodelay;
    }

    public void setNodelay(boolean nodelay) {
        this.nodelay = nodelay;
        if (nodelay) {
            this.rxMinrto = IKCP_RTO_NDL;
        } else {
            this.rxMinrto = IKCP_RTO_MIN;
        }
    }


    public void setFastresend(int fastresend) {
        this.fastresend = fastresend;
    }


    public void setRxMinrto(int rxMinrto) {
        this.rxMinrto = rxMinrto;
    }

    public void setRcvWnd(int rcvWnd) {
        this.rcvWnd = rcvWnd;
    }

    public void setAckMaskSize(int ackMaskSize) {
        this.ackMaskSize = ackMaskSize;
        this.IKCP_OVERHEAD+=(ackMaskSize/8);
        this.mss = mtu - IKCP_OVERHEAD-reserved;
    }

    public void setReserved(int reserved) {
        this.reserved = reserved;
        this.mss = mtu - IKCP_OVERHEAD-reserved;
    }


    public int getSndWnd() {
        return sndWnd;
    }

    public void setSndWnd(int sndWnd) {
        this.sndWnd = sndWnd;
    }

    public boolean isStream() {
        return stream;
    }

    public void setStream(boolean stream) {
        this.stream = stream;
    }

    public void setAckNoDelay(boolean ackNoDelay) {
        this.ackNoDelay = ackNoDelay;
    }

    @Override
    public String toString() {
        return "Kcp(" +
                "conv=" + conv +
                ')';
    }


    public static class Segment {

        /**会话id**/
        private int conv;
        /**命令**/
        private byte cmd;
        /**message中的segment分片ID（在message中的索引，由大到小，0表示最后一个分片）**/
        private byte frg;
        /**剩余接收窗口大小(接收窗口大小-接收队列大小)**/
        private short wnd;
        /**message发送时刻的时间戳**/
        private long ts;
        /**message分片segment的序号**/
        private long sn;
        /**待接收消息序号(接收滑动窗口左端)**/
        private long una;
        /**下次超时重传的时间戳**/
        private long resendts;
        /**该分片的超时重传等待时间**/
        private int rto;
        /**收到ack时计算的该分片被跳过的累计次数，即该分片后的包都被对方收到了，达到一定次数，重传当前分片**/
        private int fastack;
        /***发送分片的次数，每发送一次加一**/
        private int xmit;

        private long ackMask;

        private IoBuffer data;

        private static final ObjectPool<Kcp.Segment> pool = new ObjectPool() {
            @Override
            public PooledObject create() {
                return new PooledObject<Kcp.Segment>(new Kcp.Segment());
            }
        };

        static Segment createSegment() {
            return pool.getObject();
        }

        void closeSegment() {
            this.reset();
            pool.returnObject(this);
        }

        /**
         * encode a segment into buffer
         *
         * @return
         */
        public byte[] encode() {
            IoBuffer buf = new IoBuffer(Kcp.IKCP_OVERHEAD);
            buf.putInt(conv);
            buf.put(cmd);
            buf.put(frg);
            buf.putShort(wnd);
            buf.putInt((int)ts);
            buf.putInt((int)sn);
            buf.putInt((int)una);
            int dataSize = data==null ? 0 : data.readableBytes();
            buf.putInt(dataSize);

            return buf.array();
        }


        void reset() {
            conv = 0;
            cmd = 0;
            frg = 0;
            wnd = 0;
            ts = 0;
            sn = 0;
            una = 0;
            resendts = 0;
            rto = 0;
            fastack = 0;
            xmit = 0;
            ackMask=0;
            if(data != null) data.clear();
            data = null;
        }


        public long getResendts() {
            return resendts;
        }

        public void setResendts(long resendts) {
            this.resendts = resendts;
        }

        public int getXmit() {
            return xmit;
        }

        public void setXmit(int xmit) {
            this.xmit = xmit;
        }
    }

}
